/*
 *  Filename:lsv_gc.c
 *  Description:
 *
 *  Created on: 2017年3月17日
 *  Author: Asdf(825674301)
 */

#include <lsv.h>
#include "list.h"
#include "core.h"
#include "schedule.h"
#include "volume_proto.h"

#include "lsv_log.h"
#include "lsv_help.h"
#include "lsv_volume.h"
#include "lsv_volume_gc.h"
#include "lsv_bitmap.h"
#include "lsv_rcache.h"
#include "lsv_wbuffer.h"
#include "lsv_gc_internal.h"
#include "lsv_gc.h"

extern uint32_t lsv_feature;

//change key to uint32
static inline uint32_t __chkid2key(const void *_chunkid) {
        return *(uint32_t*) _chunkid;
}

void lsv_gc_log_free(lsv_volume_proto_t *lsv_info, lsv_u32_t log_chkid) {
        int ret = 0;
        lsv_log_info_t *log_info = NULL;
        lsv_gc_context_t *gc_context = NULL;
        lsv_gc_logctrl_t* logctrl = NULL;

        log_info = lsv_info->log_info;
        gc_context = log_info->gc_context;

        ret = hash_table_remove(gc_context->logctrl_tab, &log_chkid, (void **) &logctrl);

        // 回收整个chunk
        YASSERT(0 == ret);
        YASSERT(logctrl->head.page_count == logctrl->lp_bm.inactive_page_count);

#ifdef LSV_GC_STATISTICS
        {
                lsv_gc_statistics_t *stat_info;
                stat_info = &((lsv_log_info_t*) lsv_info->log_info)->gc_context->stat_info;
                stat_info->storage_pages -= logctrl->head.page_count;
                stat_info->merge_free_count++;
                // DINFO_LOGGC_STATISTICS(stat_info);
        }
#endif

        // flush
        lsv_rcache_clean(lsv_info, log_chkid);

        // free
        lsv_gc_logctrl_free(lsv_info, logctrl);
        lsv_volume_chunk_free(lsv_info, LSV_LOG_LOG_STORAGE_TYPE, log_chkid);

        gc_context->total_chunk_num--;
}

int lsv_gc_add(lsv_volume_proto_t *lsv_info, lsv_log_proto_t *log_proto) {
        int ret;

        if (lsv_feature & LSV_FEATURE_GC) {
                ret = lsv_gc_add_to_check_queue(lsv_info, log_proto, LSV_CHECK_QUEUE_ADD_TYPE_TAIL);
                if (ret < 0) {
                        DERROR("log gc add err, errno:%d\n", ret);
                        GOTO(err_ret, ret);
                }

#ifdef LSV_GC_STATISTICS
                lsv_gc_statistics_t *stat_info;
                stat_info = &((lsv_log_info_t*) lsv_info->log_info)->gc_context->stat_info;
                stat_info->real_pages += log_proto->head->page_count;
                stat_info->storage_pages += log_proto->head->page_count;
                // DINFO_LOGGC_STATISTICS(stat_info);
#endif

                // ret = lsv_log_gc(lsv_info, LSV_GC_TYPE_NORMAL);
                ret = lsv_gc(lsv_info, LSV_GC_TYPE_LOOP);
                if (ret < 0) {
                        DERROR("run gc err,errno:%d\n", ret);
                        GOTO(err_ret, ret);
                }

                lsv_log_info_t *log_info = lsv_info->log_info;
                lsv_gc_context_t *gc_context = log_info->gc_context;
                gc_context->total_chunk_num++;

                lsv_gc_stat(lsv_info);
        }

        return 0;
        err_ret:
        return ret;
}

int lsv_gc_add_to_check_queue(lsv_volume_proto_t *lsv_info, lsv_log_proto_t* log_proto, lsv_u8_t add_type) {
        int ret = 0;
        lsv_log_info_t *log_info = NULL;
        lsv_gc_context_t *gc_context = NULL;
        lsv_gc_logctrl_t *logctrl = NULL;

        log_info = lsv_info->log_info;
        gc_context = log_info->gc_context;

        YASSERT(0 != log_proto->chunk_id);

        logctrl = (lsv_gc_logctrl_t*) hash_table_find(gc_context->logctrl_tab, &log_proto->chunk_id);
        if (NULL != logctrl) {
                DWARN("chunk is in check queue, chunk_id %u\n", log_proto->chunk_id);
                GOTO(ERR0, ret);
        }

        // @malloc ERR1
        ret = lsv_gc_logctrl_create(lsv_info, 0, *log_proto, &logctrl);
        if (ret < 0) {
                DERROR("create logctrl fail, ret %d\n", ret);
                GOTO(ERR0, ret);
        }

        ret = hash_table_insert(gc_context->logctrl_tab, logctrl, &logctrl->chunk_id, 0);
        if (ret < 0) {
                DERROR("put logctrl_tab fail, ret %d\n", ret);
                GOTO(ERR1, ret);
        }

        if (add_type) {
                _lsv_gc_check_queue_offer(&gc_context->check_queue, logctrl);
        } else {
                _lsv_gc_check_queue_push(&gc_context->check_queue, logctrl);
        }

        return 0;
ERR1:
        lsv_gc_logctrl_free(lsv_info, logctrl);
ERR0:
        return ret;
}

static int lsv_gc_one_step(lsv_volume_proto_t *lsv_info, lsv_u8_t type, lsv_s32_t *is_continue) {
        int ret = 0;
        lsv_log_info_t *log_info = NULL;
        lsv_gc_context_t *gc_context = NULL;
        lsv_gc_logctrl_t *logctrl_cqueue = NULL, *new_logctrl = NULL;

        log_info = lsv_info->log_info;
        gc_context = log_info->gc_context;

        // 检查cq (check queue)，并移动合适的logctrl到heap中
        // 如果失败，仍放回cq
        if ((gc_context->check_queue.count > 0) && (type & LSV_GC_TYPE_BIT_POLL)) { //can poll
                if (!((gc_context->check_queue.count < gc_context->check_queue_size) &&
                      (type & LSV_GC_TYPE_BIT_POLLFULL))) { //can pollfull
                        _lsv_gc_check_queue_poll(&gc_context->check_queue, &logctrl_cqueue);

                        ret = lsv_gc_check(lsv_info, logctrl_cqueue);
                        if (ret < 0) {
                                DERROR("gc check failed, ret %d\n", ret);
                                _lsv_gc_check_queue_offer(&gc_context->check_queue, logctrl_cqueue);
                                ret = 0;
                                goto ERR0;
                        }

                        // add to gc_heap
                        ret = lsv_gc_heap_insert(lsv_info, logctrl_cqueue);
                        if (ret < 0) {
                                DERROR("gc_heap insert failed, ret %d\n", ret);
                                _lsv_gc_check_queue_offer(&gc_context->check_queue, logctrl_cqueue);
                                ret = 0;
                                goto ERR0;
                        }
                }
        }

        // 处理gc heap，合成新logctrl
        ret = lsv_gc_heap_do_gc(lsv_info, type, &new_logctrl);
        if (ret < 0) {
                DERROR("gcpop failed, ret %d\n", ret);
                goto ERR0;
        }

        if (NULL == new_logctrl && NULL == logctrl_cqueue) {
                *is_continue = 0;
        } else if (type & LSV_GC_TYPE_BIT_LOOP) {
                *is_continue = 1;
        } else {
                *is_continue = 0;
        }

        return 0;
ERR0:
        *is_continue = 0;
        return ret;
}

static void lsv_gc_(void *arg) {
        int ret = 0;
        lsv_log_info_t *log_info = NULL;
        lsv_gc_context_t *gc_context = NULL;
        lsv_gc_arg_t * gc_arg;
        lsv_volume_proto_t *lsv_info;
        lsv_u8_t type;
        lsv_s32_t is_continue = 0;

        gc_arg = arg;
        lsv_info = gc_arg->lsv_info;
        type = gc_arg->type;

        log_info = lsv_info->log_info;
        gc_context = log_info->gc_context;

        gc_context->stat_info.task_count++;

        // PRINT_GC_STAT(lsv_info);

        ret = lsv_gc_one_step(lsv_info, type, &is_continue);
        if (unlikely(ret))
                GOTO(ERR0, ret);

        if (is_continue) {
                lsv_gc(lsv_info, type);
        }

        gc_context->stat_info.task_count--;

        if (gc_arg->is_free) {
                free(gc_arg);
        }

        return;
ERR0:
        DERROR("gc err.err:%d", ret);
        if (gc_arg->is_free) {
                free(gc_arg);
        }
}

int lsv_gc(lsv_volume_proto_t *lsv_info, lsv_u8_t type) {
        int ret = 0;
        lsv_log_info_t *log_info = NULL;
        lsv_gc_context_t *gc_context = NULL;
        lsv_gc_arg_t *gc_arg = NULL;

        log_info = lsv_info->log_info;
        gc_context = log_info->gc_context;

        // TODO check if gc task too many, don't create a new task
        if (gc_context->stat_info.task_count < TASK_MAX / 3) {
                // @malloc ERR1
                ret = ymalloc((void **)&gc_arg, sizeof(lsv_gc_arg_t));
                if (unlikely(ret)) {
                        ret = -ENOMEM;
                        DERROR("malloc gc_arg failed, errno %d\n", ret);
                        GOTO(ERR0, ret);
                }

                gc_arg->msg_type = LSV_GC_ARG_TPYE_GC;
                gc_arg->lsv_info = lsv_info;
                gc_arg->type = type;
                gc_arg->is_free = 1;

                DINFO_NOP("create a log gc task, type %u\n", type);
                schedule_task_new("lsv_gc_", lsv_gc_, gc_arg, -1);
        }

        return 0;
ERR0:
        return ret;
}

static int __init_gc_module(lsv_volume_proto_t *lsv_info) {
        int ret = 0;
        lsv_log_info_t *log_info = NULL;
        lsv_gc_context_t *gc_context = NULL;
        char _hash_table_name[LSV_MAX_STRING_LEN];
        lsv_u32_t i;

        log_info = lsv_info->log_info;
        log_info->gc_context = NULL;

        // @malloc ERR1
        gc_context = (lsv_gc_context_t *) malloc(sizeof(lsv_gc_context_t));
        if (NULL == gc_context) {
                ret = -ENOMEM;
                DERROR("malloc gc_context failed,errno:%d\n", ret);
                goto ERR0;
        }
        log_info->gc_context = gc_context;

        // init check_queue
        // lsv_lock_init(&gc_context->check_queue.lock);

        gc_context->check_queue_size = _lsv_gc_check_queue_size(lsv_info);
        gc_context->total_chunk_num = 0;

        INIT_LIST_HEAD(&gc_context->check_queue.queue);
        gc_context->check_queue.count = 0;

        // init valueup_queue
        // lsv_lock_init(&gc_context->valueup_queue.lock);
        gc_context->valueup_queue.count = 0;
        INIT_LIST_HEAD(&gc_context->valueup_queue.queue);

        // init gc_heap
        // lsv_lock_init(&gc_context->gc_heap.lock);
        gc_context->gc_heap.count = 0;
        memset(gc_context->gc_heap.heap, 0, LSV_GC_HEAP_SIZE * sizeof(lsv_gc_logctrl_t *));

        // init logctrl_tab
        // lsv_rwlock_init(&gc_context->log_ctrl_tab_lock);
        // TODO ino
        sprintf(_hash_table_name, LSV_GC_LOGCTRL_TAB_NAME_BASE"%u", 1);
        gc_context->logctrl_tab = hash_create_table(__lsv_gc_logctrl_cmp, __chkid2key, _hash_table_name);

        //init task_queue
        co_mq_init(&gc_context->gc_queue, "gc_task");
        co_mq_init(&gc_context->fullgc_queue, "fullgc_task");
        gc_context->gc_count = 0;

        // init old_storage
        lsv_lock_init(&gc_context->old_storage.lock);

        gc_context->old_storage.chunk_count = 0;
        gc_context->old_storage.gc_chkid = LSV_CHUNK_NULL;
        gc_context->old_storage.ready_chkid = LSV_CHUNK_NULL;
        gc_context->old_storage.ready_chunk.count = 0;
        gc_context->old_storage.ready_chunk.next_chkid = LSV_CHUNK_NULL;
        gc_context->old_storage.all_gc_value = 0;

#ifdef IS_STORAGE_BITMAP
        //create storage
        gc_context->bitmap_is_open = 1;

        for (i = 0; i < LSV_GC_BITMAP_CHUNK_NUM; i++) {
                lsv_lock_init(&gc_context->bitmap_lock[i]);
                gc_context->bitmaps[i] = NULL;

        }
#endif

        // __lsv_gc_old_storage_head_set(lsv_info);
        // __lsv_gc_storage_head_set(lsv_info);

        lsv_gc_stat_init(gc_context);

        DINFO("gc_create check_queue_size %u\n", gc_context->check_queue_size);
        return 0;
ERR0:
        log_info->gc_context = NULL;
        return ret;
}

static int __load_gc_page(lsv_volume_proto_t *lsv_info) {
        int ret;
        lsv_log_info_t *log_info = NULL;
        lsv_gc_context_t *gc_context = NULL;
        int8_t buff[LSV_PAGE_SIZE];
        lsv_u32_t i, j;
        lsv_u32_t ready_chunk_count, bitmap_chkid;
        lsv_gc_bitmap_record_t *bitmap_record = NULL;

        log_info = lsv_info->log_info;
        gc_context = log_info->gc_context;

        // read info
        ret = lsv_volume_chunk_read_data(lsv_info, LSV_THIS_VOL_INO, LSV_PRIM_CHUNK_ID,
                                        lsv_info->u.gc_os_page_id * LSV_PAGE_SIZE,
                                        LSV_PAGE_SIZE,
                                        buff);
        if (ret < 0) {
                DERROR("upload volgc stroage err,errno:%d\n", ret);
                goto ERR0;
        }

        lsv_gc_old_storage_meta_t *meta = (lsv_gc_old_storage_meta_t *)(buff + LSV_GC_OS_HEAD);

        YASSERT(meta->gc_chkid > 0);
        YASSERT(meta->ready_chkid > 0);

        gc_context->old_storage.chunk_count = meta->chunk_count;
        gc_context->old_storage.gc_chkid = meta->gc_chkid;
        gc_context->old_storage.ready_chkid = meta->ready_chkid;
        ready_chunk_count = meta->ready_chunk_count;

        ret = lsv_volume_chunk_read_data(lsv_info,
                                        LSV_THIS_VOL_INO,
                                        gc_context->old_storage.ready_chkid,
                                        0,
                                        (2 + ready_chunk_count) * sizeof(lsv_u32_t),
                                        (lsv_s8_t *)&gc_context->old_storage.ready_chunk);
        if (ret < 0) {
                DERROR("upload old_storage err,errno:%d\n", ret);
                YASSERT(0);
        }

        YASSERT(gc_context->old_storage.ready_chunk.count == ready_chunk_count);
        YASSERT(gc_context->old_storage.ready_chunk.next_chkid == LSV_CHUNK_NULL);

        gc_context->total_chunk_num = gc_context->old_storage.ready_chunk.count +
                                      gc_context->old_storage.chunk_count * LSV_GC_OS_CHUNK_NUM;

        // create storage
        gc_context->bitmap_is_open = 1;

        lsv_gc_old_storage_bitmap_meta_t *os_meta = (lsv_gc_old_storage_bitmap_meta_t *)(buff + LSV_GC_BITMAP_HEAD);

        for (i = 0; i < LSV_GC_BITMAP_CHUNK_NUM; i++) {
                bitmap_chkid = os_meta->chunk_ids[i];
                DINFO("gc bitmap %u/%lu chunk_id %u\n", i, LSV_GC_BITMAP_CHUNK_NUM, bitmap_chkid);

                // 加载log bitmap
                if (0 == bitmap_chkid) {
                        gc_context->bitmaps[i] = NULL;
                } else {
                        ret = ymalloc((void **)&bitmap_record, sizeof(lsv_gc_bitmap_record_t));
                        if (unlikely(ret)) {
                                ret = -ENOMEM;
                                DERROR("malloc bitmap_record failed,errno:%d\n", ret);
                                for (j = 0; j < i; j++) {
                                        lsv_lock_destroy(&gc_context->bitmap_lock[j]);
                                        if (gc_context->bitmaps[j]) {
                                                free(gc_context->bitmaps[j]);
                                        }
                                }
                                goto ERR0;
                        }

                        ret = lsv_volume_chunk_read(lsv_info, bitmap_chkid, (lsv_s8_t *)bitmap_record->bitmap);
                        if (ret < 0) {
                                DERROR("upload old_storage err, errno:%d\n", ret);
                                YASSERT(0);
                        }

                        bitmap_record->chunk_id = bitmap_chkid;

                        gc_context->bitmaps[i] = bitmap_record;
                }
        }

        lsv_gc_stat(lsv_info);

        return 0;
ERR0:
        return ret;
}

/**
 * TODO 与upload，recovery有很多重复的
 *
 * @param lsv_info
 * @return
 */
int lsv_gc_create(lsv_volume_proto_t *lsv_info) {
        int ret = 0;
        lsv_log_info_t *log_info = NULL;
        lsv_gc_context_t *gc_context = NULL;
        lsv_u32_t old_storage_chkid;

        ret = __init_gc_module(lsv_info);
        if (unlikely(ret)) {
                GOTO(ERR0, ret);
        }

        log_info = lsv_info->log_info;
        gc_context = log_info->gc_context;

        // @malloc ERR1
        ret = lsv_volume_chunk_malloc(lsv_info, LSV_GC_STORAGE_TYPE, &old_storage_chkid);
        if (ret < 0) {
                DERROR("malloc olg_storage_chunk failed,errno:%d\n", ret);
                goto ERR0;
        }

        // init old_storage, chunk_id == 65
        gc_context->old_storage.gc_chkid = old_storage_chkid;
        gc_context->old_storage.ready_chkid = old_storage_chkid;

        lsv_gc_old_storage_head_set(lsv_info);

        // init log bitmap
        lsv_gc_bitmap_set_head(lsv_info);

        return 0;
ERR0:
        return ret;
}

int lsv_gc_delete(lsv_volume_proto_t *lsv_info) {
        int rc = 0;
        lsv_log_info_t *log_info = NULL;
        lsv_gc_context_t *gc_context = NULL;
        lsv_u32_t i;

        log_info = lsv_info->log_info;
        gc_context = log_info->gc_context;
        if (gc_context) {
                // lsv_lock_destroy(&gc_context->valueup_queue.lock);
                // lsv_lock_destroy(&gc_context->check_queue.lock);
//                lsv_lock_destroy(&gc_context->gc_heap.lock);
                lsv_lock_destroy(&gc_context->old_storage.lock);
                // lsv_rwlock_destroy(&gc_context->log_ctrl_tab_lock);
                hash_destroy_table(gc_context->logctrl_tab, __lsv_gc_logctrl_free);
#ifdef IS_STORAGE_BITMAP
                //delete storage
                for (i = 0; i < LSV_GC_BITMAP_CHUNK_NUM; i++) {
                        lsv_lock_destroy(gc_context->bitmap_lock + i);
                        if (gc_context->bitmaps[i]) {
                                free(gc_context->bitmaps[i]);
                        }
                }
#endif
                free(gc_context);
                log_info->gc_context = NULL;
        }

        return rc;
}

int lsv_gc_flush(lsv_volume_proto_t *lsv_info) {
        lsv_log_info_t *log_info = NULL;
        lsv_gc_context_t *gc_context = NULL;
        lsv_gc_check_queue_t *check_queue = NULL;
        lsv_gc_heap_t *gc_heap = NULL;
        lsv_gc_logctrl_t *logctrl = NULL;

        log_info = lsv_info->log_info;
        gc_context = log_info->gc_context;
        check_queue = &gc_context->check_queue;
        gc_heap = &gc_context->gc_heap;

        lsv_gc_valueup_commit(lsv_info);

        // store gc_heap
        DINFO("store gc_heap, count:%u\n", gc_heap->count);
        for (int i = 0; i < gc_heap->count; i++) {
                lsv_gc_old_storage_remove_and_add(lsv_info, gc_heap->heap[i]->chunk_id, 0);
                gc_heap->heap[i] = NULL;
        }
        gc_heap->count = 0;

        //store check_queue
        DINFO("store check_queue, count %u\n", check_queue->count);
        while (check_queue->count > 0) {
                _lsv_gc_check_queue_poll(check_queue, &logctrl);
                lsv_gc_old_storage_remove_and_add(lsv_info, logctrl->chunk_id, 0);
        }
        YASSERT(check_queue->count == 0);

        // flush ready chunk
        lsv_gc_old_storage_remove_and_add(lsv_info, LSV_CHUNK_NULL, 1);

        //store old_storage
        //__lsv_gc_old_storage_head_set(lsv_info);

        return 0;
}

int lsv_gc_load(lsv_volume_proto_t *lsv_info) {
        int ret = 0;

        ret = __init_gc_module(lsv_info);
        if (unlikely(ret)) {
                GOTO(ERR0, ret);
        }

        ret = __load_gc_page(lsv_info);
        if (unlikely(ret)) {
                GOTO(ERR0, ret);
        }

        return 0;
ERR0:
        return ret;
}

static int lsv_gc_recovery_storage_record_(lsv_volume_proto_t *lsv_info, lsv_u32_t idx, lsv_u32_t bitmap_chkid) {
        int ret = 0;
        lsv_log_info_t *log_info = NULL;
        lsv_gc_context_t *gc_context = NULL;
        lsv_volgc_info_t *volgc_info = NULL;
        lsv_gc_bitmap_record_t *bitmap_record = NULL;
        lsv_u32_t *recovery_chunk_id_array = NULL, recovery_chunk_count, recovery_chunk_id;
        lsv_u32_t i, j;
        lsv_u8_t bit;

        log_info = lsv_info->log_info;
        gc_context = log_info->gc_context;
        volgc_info = lsv_info->volgc_info;

        bitmap_record = gc_context->bitmaps[idx];
        if (bitmap_record == NULL) {
                return 0;
        }

        YASSERT(bitmap_record->chunk_id == bitmap_chkid);

        LSV_TEST_TIME_BEGIN(loggc_storage_record2);

        recovery_chunk_id_array = (lsv_u32_t *) malloc(sizeof(lsv_u32_t) * LSV_GC_CHECK_QUEUE_RECOVERY_ADD_TASK_MAX);
        if (NULL == recovery_chunk_id_array) {
                ret = -ENOMEM;
                DERROR("malloc failed, ret %d\n", ret);
                goto ERR;
        }
        recovery_chunk_count = 0;

        int step = 10000;
        int count = 0;

        // 针对chunk内的每个byte
        for (i = 0; i < LSV_CHUNK_SIZE; i++) {
                // byte is all zero's
                if (!bitmap_record->bitmap[i]) {
                        continue;
                }

                for (j = 0; j < 8; j += LSV_GC_STORAGE_BIT_LEN) {
                        bit = (bitmap_record->bitmap[i] >> j) & ((lsv_u8_t) 0x01);
                        recovery_chunk_id = idx * LSV_GC_STORAGE_CHUNK_SIZE +
                                            i * (8 / LSV_GC_STORAGE_BIT_LEN) +
                                            j/LSV_GC_STORAGE_BIT_LEN;

                        if (count % step == 0) {
                                DINFO("%d chunk_id:%u bit1:%u bit2:%u\n", count,
                                      recovery_chunk_id,
                                      bitmap_record->bitmap[i], bit);
                        }

                        if (bit) {
                                if (count % step == 0) {
                                        DINFO("%d chunk_id:%u bit1:%u bit2:%u\n",
                                              count,
                                              recovery_chunk_id,
                                              bitmap_record->bitmap[i], bit);
                                }

                                recovery_chunk_id_array[recovery_chunk_count++] = recovery_chunk_id;
                                if (LSV_GC_CHECK_QUEUE_RECOVERY_ADD_TASK_MAX == recovery_chunk_count) {
                                        lsv_gc_add_to_check_queue_batch(lsv_info,
                                                                        recovery_chunk_count,
                                                                        recovery_chunk_id_array,
                                                                        LSV_CHECK_QUEUE_ADD_TYPE_TAIL);
                                        recovery_chunk_count = 0;
                                }
                        }
                        count++;

                        // 可能吗？
                        if (recovery_chunk_id > volgc_info->tail) {
                                goto RET;
                        }
                }
        }

RET:
        if (recovery_chunk_count > 0) {
                lsv_gc_add_to_check_queue_batch(lsv_info,
                                                    recovery_chunk_count,
                                                    recovery_chunk_id_array,
                                                    LSV_CHECK_QUEUE_ADD_TYPE_TAIL);
        }

        LSV_TEST_TIME_END(loggc_storage_record2);

        return 0;
ERR:
        return ret;
}

static void __do_lsv_gc_recovery_storage_record(void *arg) {
        int ret = 0;
        lsv_gc_recovery_bitmap_record_arg_t* ctx = arg;
        lsv_u32_t *task_count;
        lsv_s32_t *task_rc;

        task_count = ctx->count;
        task_rc = ctx->rc;

        ret = lsv_gc_recovery_storage_record_(ctx->lsv_info, ctx->idx, ctx->bitmap_chkid);
        if (ret < 0) {
                DERROR("recovery_storage_record err,errno:%d\n,idx:%u\n", ret, ctx->idx);
                *task_rc = ret;
        }
        *task_count -= 1;

        DINFO("finish recovery_storage_record task, idx:%u\n", ctx->idx);
        if (*task_count == 0) {
                schedule_resume(ctx->wait_task, 0, NULL);
        }

        if (ctx->is_free) {
                free(ctx);
        }
}

/**
 * @brief 启动过程，不需要等待gc recovery过程的完成。所以，可以异步去做。
 *
 * @param lsv_info
 * @return
 */
int lsv_gc_recovery(lsv_volume_proto_t *lsv_info) {
        int ret = 0;
        lsv_log_info_t *log_info = NULL;
        lsv_gc_context_t *gc_context = NULL;
        lsv_gc_recovery_bitmap_record_arg_t * rsr_arg;
        lsv_u8_t wait_hehe;
        task_t wait_task;
        lsv_u32_t task_count;
        lsv_s32_t task_rc;

        ret = __init_gc_module(lsv_info);
        if (unlikely(ret)) {
                GOTO(ERR0, ret);
        }

        ret = __load_gc_page(lsv_info);
        if (unlikely(ret)) {
                GOTO(ERR0, ret);
        }

        log_info = lsv_info->log_info;
        gc_context = log_info->gc_context;

        DINFO("loggc recovery begin\n");
        LSV_TEST_TIME_BEGIN(gc_recovery);

        //create storage
        gc_context->bitmap_is_open = 0;
        task_count = 0;
        task_rc = 0;
        wait_hehe = 1;

        for (uint32_t i = 0; i < LSV_GC_BITMAP_CHUNK_NUM; i++) {
                if (gc_context->bitmaps[i]) {
                        if (wait_hehe){
                                // TODO 在yield之间，不允许调用多次schedule_task_get
                                wait_task = schedule_task_get();
                                wait_hehe = 0;
                        }

                        ret = ymalloc((void **)&rsr_arg, sizeof(lsv_gc_recovery_bitmap_record_arg_t));
                        if (unlikely(ret)) {
                                YASSERT(0);
                        }

                        task_count++;

                        rsr_arg->lsv_info = lsv_info;
                        rsr_arg->idx = i;
                        rsr_arg->bitmap_chkid = gc_context->bitmaps[i]->chunk_id;
                        rsr_arg->count = &task_count;
                        rsr_arg->wait_task = &wait_task;
                        rsr_arg->rc = &task_rc;
                        rsr_arg->is_free = 1;

                        // fork-
                        schedule_task_new("__do_lsv_gc_recovery_storage_record",
                                          __do_lsv_gc_recovery_storage_record,
                                          rsr_arg,
                                          -1);
                        DINFO("create recovery_storage_record task, idx:%u\n", rsr_arg->idx);
                }
        }

        // -and join
        DINFO("count %u\n", task_count);
        if (task_count > 0) {
                schedule_yield(__FUNCTION__, NULL, NULL);
        }

        if (task_rc < 0) {
                DERROR("recovery_storage_record task err, ret %d\n", ret);
                goto ERR1;
        }

        gc_context->bitmap_is_open = 1;
        LSV_TEST_TIME_END(gc_recovery);

        DINFO("chunk_count %u ready_count %u\n",
              gc_context->old_storage.chunk_count,
              gc_context->old_storage.ready_chunk.count);

        return 0;
ERR1:
        lsv_gc_delete(lsv_info);
ERR0:
        return ret;
}

int lsv_gc_setattr(lsv_volume_proto_t *lsv_info) {

        int rc = 0;

        (void) lsv_info;
        return rc;
}
